rabbitmq amqp-client源码解读

前言

一般讲到网络通信模型,就会涉及到协议,事件驱动模型(select,poll,epoll等),以及序列化与反序列化实现机制。众所周知,在网络传输中,传输的内容都是字节流的形式,所以在网络编程中序列化与反序列化机制则是比较重要的一个模块,而在amqp-client中是没有考虑序列化与反序列化实现机制,而是将这一模块,交由应用层去实现,只负责数据的传输,所以在amqp-client的API中都是直接传输字节数组的参数。下面将围绕着AMQP协议,Socket网络编程,线程协作wait/notify等来展开说明amqp-client具体实现过程,首先在AMQP协议中有这样几个抽象的概念Connection,Channel,Command,Frame等,而amqp-client则是对AMQP协议的具体实现(如同httpclient实现了http协议一样)。其中一个Connection可以创建多个Channel,每个Channel类似一个会话,我们可以针对Channel开启事务模式,Publisher Confirms确认机制等,多个Channel共用一个TCP连接收发消息。

Connection

和JDBC中的Connection概念一样,amqp-client中一个Connection底层也是通过Socket实现的TCP连接,可以通过一个Connection创建多个Channel,多个Channel共用此Connection与服务端broker进行通信,所以各Channel在使用Connection进行消息收发时,必须要使用Connection级别的锁来控制并发数据流读写操作,防止多个Channel数据读写错乱。

在amqp-client中Connection抽象概念的具体实现类是AMQConnection

public class AMQConnection extends ShutdownNotifierComponent 
implements Connection, NetworkConnection {
private final ExecutorService consumerWorkServiceExecutor;
private final ScheduledExecutorService heartbeatExecutor;
private final ExecutorService shutdownExecutor;
private Thread mainLoopThread;
private ThreadFactory threadFactory = Executors.defaultThreadFactory();

// 告诉broker客户端支持的特性- 默认值
public static Map<String, Object> defaultClientProperties() {
Map<String,Object> props = new HashMap<String, Object>();
props.put("product", LongStringHelper.asLongString("RabbitMQ"));
props.put("version", LongStringHelper.asLongString(ClientVersion.VERSION));
props.put("platform", LongStringHelper.asLongString("Java"));
props.put("copyright", LongStringHelper.asLongString(Copyright.COPYRIGHT));
props.put("information", LongStringHelper.asLongString(Copyright.LICENSE));

Map<String, Object> capabilities = new HashMap<String, Object>();
capabilities.put("publisher_confirms", true);
capabilities.put("exchange_exchange_bindings", true);
capabilities.put("basic.nack", true);
capabilities.put("consumer_cancel_notify", true);
capabilities.put("connection.blocked", true);
capabilities.put("authentication_failure_close", true);

props.put("capabilities", capabilities);

return props;
}

protected ConsumerWorkService _workService = null;
/** Frame 处理器 */
private final FrameHandler _frameHandler;
/** Flag controlling the main driver loop's termination */
private volatile boolean _running = false;
/** 连接全局异常处理器*/
private final ExceptionHandler _exceptionHandler;
/** Manages heart-beat sending for this connection */
private HeartbeatSender _heartbeatSender;

private final String _virtualHost;
private final Map<String, Object> _clientProperties;
private final SaslConfig saslConfig;
private final int requestedHeartbeat;
private final int requestedChannelMax;
private final int requestedFrameMax;
private final int handshakeTimeout;
private final int shutdownTimeout;
private final String username;
private final String password;
private final Collection<BlockedListener> blockedListeners =
new CopyOnWriteArrayList<BlockedListener>();

/** Maximum frame length, or zero if no limit is set */
private volatile int _frameMax = 0;
/** Count of socket-timeouts that have happened without any incoming frames */
private volatile int _missedHeartbeats = 0;
/** Currently-configured heart-beat interval, in seconds. 0 meaning none. */
private volatile int _heartbeat = 0;
/** Object that manages a set of channels */
private volatile ChannelManager _channelManager;
/** Saved server properties field from connection.start */
private volatile Map<String, Object> _serverProperties;

/**
* Protected API - respond, in the driver thread, to a ShutdownSignal.
* @param channel the channel to disconnect
*/
public final void disconnectChannel(ChannelN channel) {
ChannelManager cm = _channelManager;
if (cm != null) cm.releaseChannelNumber(channel);
}

/** Construct a new connection
* @param params parameters for it
*/
public AMQConnection(ConnectionParams params, FrameHandler frameHandler)
{
checkPreconditions();
this.username = params.getUsername();
this.password = params.getPassword();
this._frameHandler = frameHandler;
this._virtualHost = params.getVirtualHost();
this._exceptionHandler = params.getExceptionHandler();

this._clientProperties = new HashMap<String, Object>(params.getClientProperties());
this.requestedFrameMax = params.getRequestedFrameMax();
this.requestedChannelMax = params.getRequestedChannelMax();
this.requestedHeartbeat = params.getRequestedHeartbeat();
this.handshakeTimeout = params.getHandshakeTimeout();
this.shutdownTimeout = params.getShutdownTimeout();
this.saslConfig = params.getSaslConfig();
this.consumerWorkServiceExecutor = params.getConsumerWorkServiceExecutor();
this.heartbeatExecutor = params.getHeartbeatExecutor();
this.shutdownExecutor = params.getShutdownExecutor();
this.threadFactory = params.getThreadFactory();

this._channelManager = null;
this._brokerInitiatedShutdown = false;
this._inConnectionNegotiation = true;
}

private void initializeConsumerWorkService() {
this._workService = new ConsumerWorkService(consumerWorkServiceExecutor,
threadFactory,
shutdownTimeout);
}

private void initializeHeartbeatSender() {
this._heartbeatSender = new HeartbeatSender(_frameHandler, heartbeatExecutor, threadFactory);
}

// connection初始化方法
public void start() throws IOException, TimeoutException {
initializeConsumerWorkService();
initializeHeartbeatSender();
this._running = true;
// Make sure that the first thing we do is to send the header,
// which should cause any socket errors to show up for us, rather
// than risking them pop out in the MainLoop
AMQChannel.SimpleBlockingRpcContinuation connStartBlocker
= new AMQChannel.SimpleBlockingRpcContinuation();

// We enqueue an RPC continuation here without sending an RPC
// request, since the protocol specifies that after sending
// the version negotiation header, the client (connection
// initiator) is to wait for a connection.start method to
// arrive.
_channel0.enqueueRpc(connStartBlocker);
try {
// The following two lines are akin to AMQChannel's
// transmit() method for this pseudo-RPC.
_frameHandler.setTimeout(handshakeTimeout);
_frameHandler.sendHeader();
} catch (IOException ioe) {
_frameHandler.close();
throw ioe;
}

// start the main loop going
MainLoop loop = new MainLoop();
final String name = "AMQP Connection " + getHostAddress() + ":" + getPort();
mainLoopThread = Environment.newThread(threadFactory, loop, name);
mainLoopThread.start();
// after this point clear-up of MainLoop is triggered by closing the frameHandler.

AMQP.Connection.Start connStart;
AMQP.Connection.Tune connTune = null;

......
.........
}

/** The special channel 0 (<i>not</i> managed by the <code><b>_channelManager</b></code>) */
private final AMQChannel _channel0 = new AMQChannel(this, 0) {
@Override public boolean processAsync(Command c) throws IOException {
// _channel0是一个特殊的处理,专门用于处于Connection Close/Blocked/Unblocked等事件
return getConnection().processControlCommand(c);
}
};

// 专门用于处于Connection Close/Blocked/Unblocked等事件回调监听器
@SuppressWarnings("unused")
public boolean processControlCommand(Command c) throws IOException
{
Method method = c.getMethod();
if (isOpen()) {
if (method instanceof AMQP.Connection.Close) { // 连接关闭回调
handleConnectionClose(c);
return true;
} else if (method instanceof AMQP.Connection.Blocked) { // 连接阻塞回调
AMQP.Connection.Blocked blocked = (AMQP.Connection.Blocked) method;
try {
for (BlockedListener l : this.blockedListeners) {
l.handleBlocked(blocked.getReason());
}
} catch (Throwable ex) {
getExceptionHandler().handleBlockedListenerException(this, ex);
}
return true;
} else if (method instanceof AMQP.Connection.Unblocked) { //连接解除阻塞事件回调
try {
for (BlockedListener l : this.blockedListeners) {
l.handleUnblocked();
}
} catch (Throwable ex) {
getExceptionHandler().handleBlockedListenerException(this, ex);
}
return true;
} else {
return false;
}
} else {
if (method instanceof AMQP.Connection.Close) {
try {
_channel0.quiescingTransmit(new AMQP.Connection.CloseOk.Builder().build());
} catch (IOException ignored) { }
return true;
} else if (method instanceof AMQP.Connection.CloseOk) {
_running = false;
return !_channel0.isOutstandingRpc();
} else { // Ignore all others.
return true;
}
}
}

// 循环从TCP连接中读取输入流
private class MainLoop implements Runnable {
/**
* Channel reader thread main loop. Reads a frame, and if it is
* not a heartbeat frame, dispatches it to the channel it refers to.
* Continues running until the "running" flag is set false by
* shutdown().
*/
public void run() {
try {
while (_running) {
Frame frame = _frameHandler.readFrame();
if (frame != null) {
_missedHeartbeats = 0;
if (frame.type == AMQP.FRAME_HEARTBEAT) {
// Ignore it: we've already just reset the heartbeat counter.
} else {
if (frame.channel == 0) { // the special channel
// 这是一个特殊的channel编号,专门用于处理connection相关事件回调机制
_channel0.handleFrame(frame);
} else {
if (isOpen()) {
// If we're still _running, but not isOpen(), then we
// must be quiescing, which means any inbound frames
// for non-zero channels (and any inbound commands on
// channel zero that aren't Connection.CloseOk) must
// be discarded.
ChannelManager cm = _channelManager;
if (cm != null) {
cm.getChannel(frame.channel).handleFrame(frame);
}
}
}
}
} else {
// Socket timeout waiting for a frame. Maybe missed heartbeat.
handleSocketTimeout();
}
}
} catch (EOFException ex) {
if (!_brokerInitiatedShutdown) shutdown(null, false, ex, true);
} catch (Throwable ex) {
_exceptionHandler.handleUnexpectedConnectionDriverException(AMQConnection.this, ex);
shutdown(null, false, ex, true);
} finally {
// Finally, shut down our underlying data connection.
_frameHandler.close();
_appContinuation.set(null);
notifyListeners();
}
}
}

/** 创建Channel*/
public Channel createChannel(int channelNumber) throws IOException {
ensureIsOpen();
ChannelManager cm = _channelManager;
if (cm == null) return null;
return cm.createChannel(this, channelNumber);
}
......
.........
}

创建Connection的工厂类:ConnectionFactory

public Connection newConnection(ExecutorService executor, Address[] addrs)
throws IOException, TimeoutException {
// make sure we respect the provided thread factory
FrameHandlerFactory fhFactory = createFrameHandlerFactory();
ConnectionParams params = params(executor);

if (isAutomaticRecoveryEnabled()) {
// see com.rabbitmq.client.impl.recovery.RecoveryAwareAMQConnectionFactory#newConnection
AutorecoveringConnection conn = new AutorecoveringConnection(params, fhFactory, addrs);
conn.init();
return conn;
} else {
IOException lastException = null;
for (Address addr : addrs) {
try {
// 从多个目标地址中创建连接,直到成功创建为止
FrameHandler handler = fhFactory.create(addr);
AMQConnection conn = new AMQConnection(params, handler);
conn.start();
return conn;
} catch (IOException e) {
lastException = e;
}
}
throw (lastException != null) ? lastException : new IOException("failed to connect");
}
}

Channel

Channel只是AMQP协议中一个抽象的概念,AMQChannel是实现Channel接口的抽象类,ChannelN则是继承自AMQChannel实现类。

Connection创建Channel最终都是委托给ChannelManager来创建的,核心代码如下:

public class ChannelManager {
private final Object monitor = new Object();
/** Mapping from <code><b>1.._channelMax</b></code> to {@link ChannelN} instance */
private final Map<Integer, ChannelN> _channelMap = new HashMap<Integer, ChannelN>();

public ChannelN getChannel(int channelNumber) {
synchronized (this.monitor) {
ChannelN ch = _channelMap.get(channelNumber);
if(ch == null) throw new UnknownChannelException(channelNumber);
return ch;
}
}

// 创建Channel
public ChannelN createChannel(AMQConnection connection, int channelNumber)
throws IOException {
ChannelN ch;
synchronized (this.monitor) {
if (channelNumberAllocator.reserve(channelNumber)) {
ch = addNewChannel(connection, channelNumber);
} else {
return null;
}
}
ch.open(); // now that it's been safely added
return ch;
}

// 回收Channel
public void releaseChannelNumber(ChannelN channel) {
synchronized (this.monitor) {
int channelNumber = channel.getChannelNumber();
ChannelN existing = _channelMap.remove(channelNumber);
// Nothing to do here. Move along.
if (existing == null)
return;
// Oops, we've gone and stomped on someone else's channel. Put it
// back and pretend we didn't touch it.
else if (existing != channel) {
_channelMap.put(channelNumber, existing);
return;
}
channelNumberAllocator.free(channelNumber);
}
}

private ChannelN addNewChannel(AMQConnection connection, int channelNumber) throws IOException {
if (_channelMap.containsKey(channelNumber)) {
throw new IllegalStateException("We have attempted to "
+ "create a channel with a number that is already in "
+ "use. This should never happen. "
+ "Please report this as a bug.");
}
ChannelN ch = instantiateChannel(connection, channelNumber, this.workService);
_channelMap.put(ch.getChannelNumber(), ch);
return ch;
}

protected ChannelN instantiateChannel(AMQConnection connection, int channelNumber,
ConsumerWorkService workService) {
return new ChannelN(connection, channelNumber, workService);
}
}

AMQChannel抽象类:

public abstract class AMQChannel extends ShutdownNotifierComponent {
protected final Object _channelMutex = new Object();

/** The connection this channel is associated with. */
private final AMQConnection _connection;

/** The current outstanding RPC request, if any. (Could become a queue in future.) */
private RpcContinuation _activeRpc = null;

/** This channel's channel number. */
private final int _channelNumber;

public AMQChannel(AMQConnection connection, int channelNumber) {
this._connection = connection;
this._channelNumber = channelNumber;
}

// MainLoop循环从连接中获取输入流信息封装成Frame,然后分发给对应的Channel进行处理
public void handleFrame(Frame frame) throws IOException {
AMQCommand command = _command;
if (command.handleFrame(frame)) { // a complete command has rolled off the assembly line
_command = new AMQCommand(); // prepare for the next one
handleCompleteInboundCommand(command);
}
}

public void handleCompleteInboundCommand(AMQCommand command) throws IOException {
// processAsync 处理Consumer中Basic.Deliver, Basic.Return and Channel.Close等回调方法
// 具体参考ChannelN中processAsync实现
if (!processAsync(command)) {
// The filter decided not to handle/consume the command,
// so it must be some reply to an earlier RPC.
// 触发回调同步阻塞请求的RpcContinuation,设置关联的BlockingCell实例中的_value值
// 并notifyAll唤醒阻塞等待k.getReply()获取响应结果的线程
nextOutstandingRpc().handleCommand(command);
markRpcFinished();
}
}

public abstract boolean processAsync(Command command) throws IOException;

// 比较经典的wait/notify/notifyAll用法 -入队
public void enqueueRpc(RpcContinuation k)
{
synchronized (_channelMutex) {
boolean waitClearedInterruptStatus = false;
// 如果当前已经有正在活跃的RPC远程调用则进入wait等待,直到当前活跃_activeRpc为null
while (_activeRpc != null) {
try {
_channelMutex.wait();// 线程释放_channelMutex进入wait阻塞状态
} catch (InterruptedException e) {
waitClearedInterruptStatus = true;
}
}
if (waitClearedInterruptStatus) {
Thread.currentThread().interrupt();
}
_activeRpc = k;
}
}

// 出队
public RpcContinuation nextOutstandingRpc()
{
synchronized (_channelMutex) {
RpcContinuation result = _activeRpc;
_activeRpc = null; //置null关键点逻辑
_channelMutex.notifyAll(); // 唤醒所有因_channelMutex.wait()阻塞的线程
return result;
}
}

protected void markRpcFinished() {
// no-op
}

public AMQCommand rpc(Method m)
throws IOException, ShutdownSignalException
{
return privateRpc(m);
}

public AMQCommand rpc(Method m, int timeout)
throws IOException, ShutdownSignalException, TimeoutException {
return privateRpc(m, timeout);
}

private AMQCommand privateRpc(Method m, int timeout)
throws IOException, ShutdownSignalException, TimeoutException {
SimpleBlockingRpcContinuation k = new SimpleBlockingRpcContinuation();
rpc(m, k);
// 同步超时等待响应结果,具体实现参考BlockingRpcContinuation,BlockingCell
return k.getReply(timeout);
}

public void rpc(Method m, RpcContinuation k)
throws IOException
{
synchronized (_channelMutex) {
ensureIsOpen();
quiescingRpc(m, k);
}
}

public void quiescingRpc(Method m, RpcContinuation k)
throws IOException
{
synchronized (_channelMutex) {
enqueueRpc(k);// 将回调器入队
quiescingTransmit(m); //并向broker提交消息
}
}

public void quiescingTransmit(Method m) throws IOException {
synchronized (_channelMutex) {
quiescingTransmit(new AMQCommand(m));
}
}

// Channel向broker发送消息
public void quiescingTransmit(AMQCommand c) throws IOException {
synchronized (_channelMutex) {
if (c.getMethod().hasContent()) {
while (_blockContent) {
try {
_channelMutex.wait();
} catch (InterruptedException ignored) {}

// This is to catch a situation when the thread wakes up during
// shutdown. Currently, no command that has content is allowed
// to send anything in a closing state.
ensureIsOpen();
}
}
c.transmit(this);
}
}
......

ChannelN核心逻辑:

public class ChannelN extends AMQChannel implements com.rabbitmq.client.Channel {
/** The ReturnListener collection. */
private final Collection<ReturnListener> returnListeners = new CopyOnWriteArrayList<ReturnListener>();
/** The FlowListener collection. */
private final Collection<FlowListener> flowListeners = new CopyOnWriteArrayList<FlowListener>();
/** The ConfirmListener collection. */
private final Collection<ConfirmListener> confirmListeners = new CopyOnWriteArrayList<ConfirmListener>();

@Override
public boolean processAsync(Command command) throws IOException
{
Method method = command.getMethod();
// we deal with channel.close in the same way, regardless
if (method instanceof Channel.Close) {
asyncShutdown(command);
return true;
}

if (isOpen()) {
// We're in normal running mode.
if (method instanceof Basic.Deliver) { //broker消息投递
processDelivery(command, (Basic.Deliver) method);
return true;
} else if (method instanceof Basic.Return) {
// 回调ReturnListene监听器
callReturnListeners(command, (Basic.Return) method);
return true;
} else if (method instanceof Channel.Flow) {
Channel.Flow channelFlow = (Channel.Flow) method;
synchronized (_channelMutex) {
_blockContent = !channelFlow.getActive();
transmit(new Channel.FlowOk(!_blockContent));
_channelMutex.notifyAll();
}
callFlowListeners(command, channelFlow);
return true;
} else if (method instanceof Basic.Ack) {
Basic.Ack ack = (Basic.Ack) method;
callConfirmListeners(command, ack);
handleAckNack(ack.getDeliveryTag(), ack.getMultiple(), false);
return true;
} else if (method instanceof Basic.Nack) {
Basic.Nack nack = (Basic.Nack) method;
callConfirmListeners(command, nack);
handleAckNack(nack.getDeliveryTag(), nack.getMultiple(), true);
return true;
} else if (method instanceof Basic.RecoverOk) {
for (Map.Entry<String, Consumer> entry : _consumers.entrySet()) {
this.dispatcher.handleRecoverOk(entry.getValue(), entry.getKey());
}
// Unlike all the other cases we still want this RecoverOk to
// be handled by whichever RPC continuation invoked Recover,
// so return false
return false;
} else if (method instanceof Basic.Cancel) {
Basic.Cancel m = (Basic.Cancel)method;
String consumerTag = m.getConsumerTag();
Consumer callback = _consumers.remove(consumerTag);
if (callback == null) {
callback = defaultConsumer;
}
if (callback != null) {
try {
this.dispatcher.handleCancel(callback, consumerTag);
} catch (Throwable ex) {
getConnection().getExceptionHandler().handleConsumerException(this,
ex, callback, consumerTag, "handleCancel");
}
}
return true;
} else {
return false;
}
} else {
// We're in quiescing mode == !isOpen()
if (method instanceof Channel.CloseOk) {
// We're quiescing, and we see a channel.close-ok:
// this is our signal to leave quiescing mode and
// finally shut down for good. Let it be handled as an
// RPC reply one final time by returning false.
return false;
} else {
// We're quiescing, and this inbound command should be
// discarded as per spec. "Consume" it by returning
// true.
return true;
}
}
}

protected void processDelivery(Command command, Basic.Deliver method) {
Basic.Deliver m = method;
// 根据consumerTag获取消费者回调器
Consumer callback = _consumers.get(m.getConsumerTag());
if (callback == null) {
if (defaultConsumer == null) {
throw new IllegalStateException("Unsolicited delivery -" +
" see Channel.setDefaultConsumer to handle this" +
" case.");
} else {
callback = defaultConsumer;
}
}

Envelope envelope = new Envelope(m.getDeliveryTag(),
m.getRedelivered(),
m.getExchange(),
m.getRoutingKey());
try {
// 具体分发处理参考ConsumerDispatcher异步处理
this.dispatcher.handleDelivery(callback,
m.getConsumerTag(),
envelope,
(BasicProperties) command.getContentHeader(),
command.getContentBody());
} catch (Throwable ex) {
getConnection().getExceptionHandler().handleConsumerException(this,
ex,
callback,
m.getConsumerTag(),
"handleDelivery");
}
}

// 回调ReturnListene监听器
private void callReturnListeners(Command command, Basic.Return basicReturn) {
try {
for (ReturnListener l : this.returnListeners) {
l.handleReturn(basicReturn.getReplyCode(),
basicReturn.getReplyText(),
basicReturn.getExchange(),
basicReturn.getRoutingKey(),
(BasicProperties) command.getContentHeader(),
command.getContentBody());
}
} catch (Throwable ex) {
getConnection().getExceptionHandler().handleReturnListenerException(this, ex);
}
}

......
..........

/** Public API - {@inheritDoc} */
public GetResponse basicGet(String queue, boolean autoAck)
throws IOException
{
validateQueueNameLength(queue);
// rpc 同步阻塞调用
AMQCommand replyCommand = exnWrappingRpc(new Basic.Get.Builder()
.queue(queue)
.noAck(autoAck)
.build());
Method method = replyCommand.getMethod();
if (method instanceof Basic.GetOk) {
Basic.GetOk getOk = (Basic.GetOk)method;
Envelope envelope = new Envelope(getOk.getDeliveryTag(),
getOk.getRedelivered(),
getOk.getExchange(),
getOk.getRoutingKey());
BasicProperties props = (BasicProperties)replyCommand.getContentHeader();
byte[] body = replyCommand.getContentBody();
int messageCount = getOk.getMessageCount();
return new GetResponse(envelope, props, body, messageCount);
} else if (method instanceof Basic.GetEmpty) {
return null;
} else {
throw new UnexpectedMethodError(method);
}
}

/** Public API - {@inheritDoc} */
public void basicAck(long deliveryTag, boolean multiple)
throws IOException
{
// transmit提交 - 不用等返回结果的
transmit(new Basic.Ack(deliveryTag, multiple));
}


/** Public API - {@inheritDoc} */
public String basicConsume(String queue, boolean autoAck, String consumerTag,
boolean noLocal, boolean exclusive, Map<String, Object> arguments,
final Consumer callback) throws IOException
{
BlockingRpcContinuation<String> k = new BlockingRpcContinuation<String>() {
public String transformReply(AMQCommand replyCommand) {
String actualConsumerTag =
((Basic.ConsumeOk) replyCommand.getMethod()).getConsumerTag();
_consumers.put(actualConsumerTag, callback); // 设置consumerTag与消费者回调器的映射关系
dispatcher.handleConsumeOk(callback, actualConsumerTag);
return actualConsumerTag;
}
};
// 同步阻塞创建消费者,成功之后回调BlockingRpcContinuation k
rpc(new Basic.Consume.Builder()
.queue(queue)
.consumerTag(consumerTag)
.noLocal(noLocal)
.noAck(autoAck)
.exclusive(exclusive)
.arguments(arguments)
.build(),
k);

try {
return k.getReply();
} catch(ShutdownSignalException ex) {
throw wrap(ex);
}
}

/** Public API - {@inheritDoc} */
public void basicPublish(String exchange, String routingKey,
boolean mandatory, boolean immediate,
BasicProperties props, byte[] body)
throws IOException
{
if (nextPublishSeqNo > 0) {
unconfirmedSet.add(getNextPublishSeqNo());
nextPublishSeqNo++;
}
BasicProperties useProps = props;
if (props == null) {
useProps = MessageProperties.MINIMAL_BASIC;
}
// transmit提交 - 不用等返回结果的
transmit(new AMQCommand(new Basic.Publish.Builder()
.exchange(exchange)
.routingKey(routingKey)
.mandatory(mandatory)
.immediate(immediate)
.build(),
useProps, body));
}
......
..........
}

ConsumerDispatcher异步分发处理MainLoop接收到的Consumer中Basic.Deliver, Basic.Return and Channel.Close逻辑

final class ConsumerDispatcher {
private final ConsumerWorkService workService;
private final AMQConnection connection;
private final Channel channel;

public void handleDelivery(final Consumer delegate,
final String consumerTag,
final Envelope envelope,
final AMQP.BasicProperties properties,
final byte[] body) throws IOException {
// 收到来自broker的消息时,异步回调客户端Consumer监听器
executeUnlessShuttingDown(
new Runnable() {
public void run() {
try {
delegate.handleDelivery(consumerTag,
envelope,
properties,
body);
} catch (Throwable ex) {
connection.getExceptionHandler().handleConsumerException(
channel,
ex,
delegate,
consumerTag,
"handleDelivery");
}
}
});
}

// 其他的handleCancel,handleCancelOk,handleConsumeOk,handleRecoverOk,handleShutdownSignal等
.....
.......

private void executeUnlessShuttingDown(Runnable r) {
if (!this.shuttingDown) execute(r);
}

private void execute(Runnable r) {
checkShutdown();
this.workService.addWork(this.channel, r);
}

private void checkShutdown() {
if (this.shutdownSignal != null) {
throw Utility.fixStackTrace(this.shutdownSignal);
}
}

Command

具体实现类是AMQCommand,在业务层消息封装成Command进行传递(Channel API参数),最终被转换成Frame,写入TCP字节输出流。

AMQCommand核心逻辑:

public class AMQCommand implements Command {

private final CommandAssembler assembler;

public boolean handleFrame(Frame f) throws IOException {
return this.assembler.handleFrame(f);
}

// 向broker提交消息命令
public void transmit(AMQChannel channel) throws IOException {
int channelNumber = channel.getChannelNumber();
AMQConnection connection = channel.getConnection();

synchronized (assembler) {
Method m = this.assembler.getMethod();
connection.writeFrame(m.toFrame(channelNumber));
if (m.hasContent()) {
byte[] body = this.assembler.getContentBody();

connection.writeFrame(this.assembler.getContentHeader()
.toFrame(channelNumber, body.length));

int frameMax = connection.getFrameMax();
int bodyPayloadMax = (frameMax == 0) ? body.length : frameMax
- EMPTY_FRAME_SIZE;

for (int offset = 0; offset < body.length; offset += bodyPayloadMax) {
int remaining = body.length - offset;
int fragmentLength = (remaining < bodyPayloadMax) ? remaining
: bodyPayloadMax;
Frame frame = Frame.fromBodyFragment(channelNumber, body,
offset, fragmentLength);
connection.writeFrame(frame);
}
}
}
connection.flush();
}
}

Frame

Frame是AMQP协议的关键,包含了具体是如何将一个Frame转化为网络中可传输的字节流的过程,可以说Frame.writeTo和Frame.readFrom两部分是整个socket通讯协议的关键。

Frame核心逻辑:

public class Frame {
/** Frame type code */
public final int type;

/** Frame channel number, 0-65535 */
public final int channel;

/** Frame payload bytes (for inbound frames) */
private final byte[] payload;

/** Frame payload (for outbound frames) */
private final ByteArrayOutputStream accumulator;

/**
* Constructs a frame for output with a type and a channel number and a
* fresh accumulator waiting for payload.
*/
public Frame(int type, int channel) {
this.type = type;
this.channel = channel;
this.payload = null;
this.accumulator = new ByteArrayOutputStream();
}

/**
* Constructs a frame for input with a type, a channel number and a
* payload byte array.
*/
public Frame(int type, int channel, byte[] payload) {
this.type = type;
this.channel = channel;
this.payload = payload;
this.accumulator = null;
}

public static Frame fromBodyFragment(int channelNumber, byte[] body,
int offset, int length)
throws IOException
{
Frame frame = new Frame(AMQP.FRAME_BODY, channelNumber);
DataOutputStream bodyOut = frame.getOutputStream();
bodyOut.write(body, offset, length);
return frame;
}

/**
* Protected API - Factory method to instantiate a Frame by reading an
* AMQP-wire-protocol frame from the given input stream.
*
* @return a new Frame if we read a frame successfully, otherwise null
*/
public static Frame readFrom(DataInputStream is) throws IOException {
int type;
int channel;

try {
type = is.readUnsignedByte();
} catch (SocketTimeoutException ste) {
// System.err.println("Timed out waiting for a frame.");
return null; // failed
}

if (type == 'A') {
/*
* Probably an AMQP.... header indicating a version
* mismatch.
*/
/*
* Otherwise meaningless, so try to read the version,
* and throw an exception, whether we read the version
* okay or not.
*/
protocolVersionMismatch(is);
}

channel = is.readUnsignedShort(); // 获取消息所属channel编号
int payloadSize = is.readInt(); //获取消息体长度
byte[] payload = new byte[payloadSize];
is.readFully(payload);// 从输入流中读取指定长度的字节数据

int frameEndMarker = is.readUnsignedByte();
if (frameEndMarker != AMQP.FRAME_END) { // 一个完整Frame结束标记FRAME_END = 206;
throw new MalformedFrameException("Bad frame end marker: " + frameEndMarker);
}

return new Frame(type, channel, payload);
}

/**
* Public API - writes this Frame to the given DataOutputStream
*/
public void writeTo(DataOutputStream os) throws IOException {
os.writeByte(type); // 将类型以字节的形式写入流
os.writeShort(channel); // 写入channel编号
if (accumulator != null) {
os.writeInt(accumulator.size()); //写入消息体总大小
accumulator.writeTo(os);
} else {
os.writeInt(payload.length);
os.write(payload); // 写入消息字节数组
}
os.write(AMQP.FRAME_END); // 写入Frame结束标记
}

/**
* Public API - retrieves a new DataInputStream streaming over the payload
*/
public DataInputStream getInputStream() {
return new DataInputStream(new ByteArrayInputStream(getPayload()));
}

/**
* Public API - retrieves a fresh DataOutputStream streaming into the accumulator
*/
public DataOutputStream getOutputStream() {
return new DataOutputStream(accumulator);
}
.....
.......
}

FrameHandlerFactory核心逻辑:

public class FrameHandlerFactory {
private final int connectionTimeout;
private final SocketFactory factory;
private final SocketConfigurator configurator;
private final boolean ssl;

public FrameHandlerFactory(int connectionTimeout, SocketFactory factory,
SocketConfigurator configurator, boolean ssl) {
this.connectionTimeout = connectionTimeout;
this.factory = factory;
this.configurator = configurator;
this.ssl = ssl;
}

public FrameHandler create(Address addr) throws IOException {
String hostName = addr.getHost();
int portNumber = ConnectionFactory.portOrDefault(addr.getPort(), ssl);
Socket socket = null;
try {
socket = factory.createSocket();
configurator.configure(socket);
socket.connect(new InetSocketAddress(hostName, portNumber),
connectionTimeout);
return create(socket);
} catch (IOException ioe) {
quietTrySocketClose(socket);
throw ioe;
}
}

public FrameHandler create(Socket sock) throws IOException
{
return new SocketFrameHandler(sock);
}

private static void quietTrySocketClose(Socket socket) {
if (socket != null)
try { socket.close(); } catch (Exception _e) {/*ignore exceptions*/}
}
......
.........
}

SocketFrameHandler核心逻辑:

public class SocketFrameHandler implements FrameHandler {
/** The underlying socket */
private final Socket _socket;

/** Socket's inputstream - data from the broker - synchronized on */
private final DataInputStream _inputStream;

/** Socket's outputstream - data to the broker - synchronized on */
private final DataOutputStream _outputStream;

/** Time to linger before closing the socket forcefully. */
public static final int SOCKET_CLOSING_TIMEOUT = 1;

/**
* @param socket the socket to use
*/
public SocketFrameHandler(Socket socket) throws IOException {
_socket = socket;

_inputStream = new DataInputStream(new BufferedInputStream(socket.getInputStream()));
_outputStream = new DataOutputStream(new BufferedOutputStream(socket.getOutputStream()));
}

.....
......
......

public Frame readFrame() throws IOException {
synchronized (_inputStream) { //输入并发锁,防止多个Channel同时从输入流中读取数据时错乱
return Frame.readFrom(_inputStream);
}
}

public void writeFrame(Frame frame) throws IOException {
synchronized (_outputStream) { //输出并发锁,防止多个Channel同时向输出流中写入数据时错乱
frame.writeTo(_outputStream);
}
}

public void flush() throws IOException {
_outputStream.flush();
}

@SuppressWarnings("unused")
public void close() {
try { _socket.setSoLinger(true, SOCKET_CLOSING_TIMEOUT); } catch (Exception _e) {}
try { flush(); } catch (Exception _e) {}
try { _socket.close(); } catch (Exception _e) {}
}
}

BlockingCell

BlockingCell是实现amqp-client同步阻塞等待请求broker服务的关键,比如:basic.get手动从queue中获取消息,以及Connection建立时与broker握手交互的通信等操作都是同步阻塞等待请求,那么如何让获取响应结果的线程阻塞,直到结果返回或请求超时才继续向下执行呢,下面将详细说明其实现原理:

BlockingCell类核心逻辑:

public class BlockingCell<T> {
/** Indicator of not-yet-filledness */
private boolean _filled = false; // 判断请求是否已经返回响应结果

/** Will be null until a value is supplied, and possibly still then. */
private T _value; // 响应结果值

/**
* Wait for a value, and when one arrives, return it (without clearing it).
* If there's already a value present, there's no need to wait
* - the existing value is returned.
* @return the waited-for value
* @throws InterruptedException if this thread is interrupted
*/
public synchronized T get() throws InterruptedException {
while (!_filled) { // 如果响应结果没有返回则一直阻塞客户端
wait();
}
return _value;
}

/**
* Wait for a value, and when one arrives, return it (without clearing it). If there's
* already a value present, there's no need to wait - the existing value is returned.
* If timeout is reached and value hasn't arrived, TimeoutException is thrown.
* @param timeout timeout in milliseconds. -1 effectively means infinity
* @return the waited-for value
* @throws InterruptedException if this thread is interrupted
*/
public synchronized T get(long timeout) throws InterruptedException, TimeoutException {
if (timeout == INFINITY) return get();

if (timeout < 0)
throw new AssertionError("Timeout cannot be less than zero");

long maxTime = System.currentTimeMillis() + timeout;
long now;
while (!_filled && (now = System.currentTimeMillis()) < maxTime) {
wait(maxTime - now); // 含响应超时时间的wait
}

if (!_filled)
throw new TimeoutException();

return _value;
}

/**
* As get(), but catches and ignores InterruptedException, retrying until a value appears.
* @return the waited-for value
*/
public synchronized T uninterruptibleGet() {
while (true) {
try {
return get();
} catch (InterruptedException ex) {
// no special handling necessary
}
}
}

......
.........

/**
* Store a value in this BlockingCell, throwing AssertionError if the cell already has a value.
* @param newValue the new value to store
*/
public synchronized void set(T newValue) {
if (_filled) { // 如果已经设置过值则抛出异常,只能设置一次值
throw new AssertionError("BlockingCell can only be set once");
}
_value = newValue; // 设置响应结果
_filled = true; // 设置已响应标识
notifyAll();// 唤醒因get操作而wait阻塞的线程,通知他们可以获取响应结果了
// 这是wait/notify/notifyAll之间很好的实现线程协作的案例
}

}

总结

  1. Connection中MainLoop循环从TCP输入流中读取Frame

    AMQConnection -> MainLoop循环从socket中读取Frame,然后根据读取Frame的channel编号分发给指定AMQChannel.handleFrame()处理。

  2. 异步流程:消费者异步通知(例如:Consumer中Basic.Deliver, Basic.Return and Channel.Close等逻辑)

    AMQChannel -> handleFrame() -> handleCompleteInboundCommand() -> processAsync() ;

  3. 同步流程:basic.get等(手动从broker拉取消息,同步阻塞流程)

    AMQChannel -> handleFrame() -> handleCompleteInboundCommand() -> nextOutstandingRpc().handleCommand(command) -> RpcContinuation.handleCommand(command) -> BlockingCell.set(T newValue);

  4. 数据写入输出流过程

    AMQChannel.transmit -> AMQCommand.transmit -> AMQConnection.writeFrame -> FrameHandler.writeFrame -> Frame.writeTo

    其中FrameHandler:真正读取Frame的包装类,最终给还是交由Frame读取和写入的,具体实现有SocketFrameHandler。